package com.tpvlog.dfs.namenode.log;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tpvlog.dfs.namenode.file.FSNameSystem;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

/**
 * Edits Log日志复制组件
 */
public class EditLogReplicator {

    private static final Integer BACKUP_NODE_FETCH_SIZE = 10;

    private FSNameSystem namesystem;

    // 缓存的一部分edits log
    private List<String> currentBufferedEditsLog = new ArrayList<String>();

    // 缓存的edits log中的最大txid
    private Long currentBufferedMaxTxid = 0L;

    // 缓存对应的磁盘文件
    private String bufferedFlushedTxid;

    public EditLogReplicator(FSNameSystem nameSystem) {
        this.namesystem = nameSystem;
    }

    /**
     * 从缓存或磁盘读取Edits Log日志
     */
    public List<String> fetchEditsLog(long syncedTxid) {

        // 1.获取已刷入磁盘的日志ID索引
        List<String> flushedTxids = namesystem.getEditsLog().getFlushedTxids();

        // 2.1 日志还没落磁盘
        if (flushedTxids.size() == 0) {
            System.out.println("暂时没有任何磁盘文件，直接从内存缓冲中拉取edits log......");
            return fetchFromBufferedEditsLog(syncedTxid);
        }
        // 2.2 日志已经落磁盘
        else {
            // 2.2.1 缓存中已经有从磁盘读取的数据
            if (bufferedFlushedTxid != null) {
                // 如果要拉取的数据就在当前缓存的磁盘文件数据里
                if (existInFlushedFile(syncedTxid, bufferedFlushedTxid)) {
                    System.out.println("上一次已经缓存过磁盘文件的数据，直接从磁盘文件缓存中拉取editslog......");
                    return fetchFromCurrentBuffer(syncedTxid);
                } else {
                    String nextFlushedTxid = getNextFlushedTxid(flushedTxids, bufferedFlushedTxid);
                    if (nextFlushedTxid != null) {
                        System.out.println("上一次缓存的磁盘文件找不到要拉取的数据，从下一个磁盘文件中拉取editslog......");
                        return fetchFromFlushedFile(syncedTxid, nextFlushedTxid);
                    }
                    // 如果没有找到下一个文件，此时就需要从Edits Log缓冲区读取
                    else {
                        System.out.println("上一次缓存的磁盘文件找不到要拉取的数据，而且没有下一个磁盘文件，尝试从内存缓冲中拉取editslog......");
                        return fetchFromBufferedEditsLog(syncedTxid);
                    }
                }
            }
            // 2.2.2 第一次尝试从磁盘读取
            else {
                List<String> result = null;
                boolean fechedFromFlushedFile = false;
                for (String flushedTxid : flushedTxids) {
                    // 如果要拉取的下一条数据就是在某个磁盘文件里
                    if (existInFlushedFile(syncedTxid, flushedTxid)) {
                        System.out.println("尝试从磁盘文件中拉取editslog，flushedTxid=" + flushedTxid);
                        result = fetchFromFlushedFile(syncedTxid, flushedTxid);
                        fechedFromFlushedFile = true;
                        break;
                    }
                }

                // 执行到这里，磁盘中没找到，说明你要拉取的txid已经比磁盘文件里的全部都新了，只能从Edits Log中去找
                if (!fechedFromFlushedFile) {
                    System.out.println("所有磁盘文件都没找到要拉取的editslog，尝试直接从内存缓冲中拉取editslog......");
                    result = fetchFromBufferedEditsLog(syncedTxid);
                }
                return result;
            }
        }
    }


    /**
     * 尝试从缓存中获取日志
     */
    private List<String> fetchFromBufferedEditsLog(long syncedTxid) {
        // fetchTxid表示当前要拉取的日志ID
        long fetchTxid = syncedTxid + 1;
        // 1.缓存中有数据
        if (fetchTxid <= currentBufferedMaxTxid) {
            System.out.println("尝试从内存缓冲拉取时，发现上一次内存缓存有数据可供拉取......");
            return fetchFromCurrentBuffer(syncedTxid);
        }
        // 2.缓存中没有数据，从Edits Log加载到缓存
        currentBufferedEditsLog.clear();

        // 从Edits Log双缓冲区读取数据并缓存
        String[] bufferedEditsLog = namesystem.getEditsLog().getBufferedEditsLog();
        if (bufferedEditsLog != null) {
            for (String editsLog : bufferedEditsLog) {
                currentBufferedEditsLog.add(editsLog);
                currentBufferedMaxTxid = JSONObject.parseObject(editsLog).getLongValue("txid");
            }
            bufferedFlushedTxid = null;
            // 再次从缓存中读取
            return fetchFromCurrentBuffer(syncedTxid);
        }
        return new ArrayList<String>(1);
    }

    /**
     * 从缓存中读取edits log
     */
    private List<String> fetchFromCurrentBuffer(long syncedTxid) {
        List<String> result = new ArrayList<String>();
        int fetchCount = 0;
        long fetchTxid = syncedTxid + 1;
        for (int i = 0; i < currentBufferedEditsLog.size(); i++) {
            String log = currentBufferedEditsLog.get(i);
            Long txid = JSON.parseObject(log).getLong("txid");

            if (txid.equals(fetchTxid)) {
                result.add(log);
                fetchTxid++;
                fetchCount++;
            }
            if (fetchCount == BACKUP_NODE_FETCH_SIZE) {
                break;
            }
        }

        return result;
    }

    /**
     * 获取下一个磁盘文件对应的txid范围
     */
    private String getNextFlushedTxid(List<String> flushedTxids, String bufferedFlushedTxid) {
        for (int i = 0; i < flushedTxids.size(); i++) {
            if (flushedTxids.get(i).equals(bufferedFlushedTxid)) {
                if (i + 1 < flushedTxids.size()) {
                    return flushedTxids.get(i + 1);
                }
            }
        }
        return null;
    }

    /**
     * 是否存在于刷到磁盘的文件中
     */
    private Boolean existInFlushedFile(Long syncedTxid, String flushedTxid) {
        String[] flushedTxidSplited = flushedTxid.split("_");

        long startTxid = Long.valueOf(flushedTxidSplited[0]);
        long endTxid = Long.valueOf(flushedTxidSplited[1]);
        // fetchTxid表示当前要拉取的日志ID
        long fetchTxid = syncedTxid + 1;

        if (fetchTxid >= startTxid && fetchTxid <= endTxid) {
            return true;
        }

        return false;
    }

    /**
     * 从已经刷入磁盘的文件里读取edits log，同时缓存这个文件数据到内存
     */
    private List<String> fetchFromFlushedFile(Long syncedTxid, String flushedTxid) {
        try {
            String[] flushedTxidSplited = flushedTxid.split("_");
            long startTxid = Long.valueOf(flushedTxidSplited[0]);
            long endTxid = Long.valueOf(flushedTxidSplited[1]);

            String currentEditsLogFile = "C:\\Users\\ressmix\\Desktop\\edits-"
                    + startTxid + "-" + endTxid + ".log";
            List<String> editsLogs = Files.readAllLines(Paths.get(currentEditsLogFile), StandardCharsets.UTF_8);

            currentBufferedEditsLog.clear();
            for (String editsLog : editsLogs) {
                currentBufferedEditsLog.add(editsLog);
                currentBufferedMaxTxid = JSONObject.parseObject(editsLog).getLongValue("txid");
            }
            // 缓存了某个刷入磁盘文件的数据
            bufferedFlushedTxid = flushedTxid;
            return fetchFromCurrentBuffer(syncedTxid);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return new ArrayList<>(1);
    }
}
